package com.niit.rdd

import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

object SparkRDD_Transform_02 {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")

    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("ERROR")
    /*
    filter:将数据根据指定的规则进行筛选过滤，符合规则的数据保留，不符合规则的数据丢弃。 当
    数据进行筛选过滤后，分区不变，但是分区内的数据可能不均衡，生产环境下，可能会出
    现数据倾斜。
    需求 过滤掉 列表中 对 2 取余 不为 0
     */

    val rdd1 = sc.makeRDD(List(1,2,3,4))
    //val fliterRdd =  rdd1.filter(num => num%2!=0)
    val fliterRdd =  rdd1.filter(_%2!=0)
    fliterRdd.collect().foreach(println)

    /*
    distinct:将数据中的重复数据进行去重
     */
    val rdd2 = sc.makeRDD( List(1,2,3,4,1,2,3,4,5) )
    val disRdd = rdd2.distinct(2)
    println("-----------------------"+disRdd.getNumPartitions)
    disRdd.collect().foreach(println)

    /*
    coalesce:
      根据数据量缩减分区，用于大数据集过滤后，提高小数据集的执行效率
      当 spark 程序中，存在过多的小任务的时候，可以通过 coalesce 方法，收缩合并分区，减
      少分区的个数，减小任务调度成本
     */

    val rdd3 =  sc.makeRDD( List(1,2,3,4,5,6) ,3 )//【1，2】 【3，4】 【5，6】
    /*
    coalesce方法默认情况下不会将分区的数据打乱重新组合
    这种情况下的缩减分区可能会导致数据不均衡，出现数据倾斜
    如果想要让数据均衡，可以进行shuffle处理
     */
    val coaRdd =  rdd3.coalesce(2,true)
    //coaRdd.saveAsTextFile("output1")
    println("----"+coaRdd.getNumPartitions);//【1，2，3】  【4，5，6】

    /*
      repartition:
      该操作内部其实执行的是 coalesce 操作，参数 shuffle 的默认值为 true。无论是将分区数
    多的 RDD 转换为分区数少的 RDD，还是将分区数少的 RDD 转换为分区数多的 RDD，
    repartition 操作都可以完成，因为无论如何都会经 shuffle 过程。
     */
    val rdd4 =  sc.makeRDD( List(1,2,3,4,5,6) ,2 )//【1，2,3】 【4,5,6】
    val reRdd =  rdd4.repartition(3)
    //reRdd.saveAsTextFile("output2")

    /*
    sortBy:该操作用于排序数据。在排序之前，可以将数据通过 f 函数进行处理，之后按照 f 函数处理
    的结果进行排序，默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一
    致。中间存在 shuffle 的过
     */

    val rdd5 = sc.makeRDD( List(6,2,4,5,3,1),2)
    val sortRdd1 =  rdd5.sortBy(num=> num)
    sortRdd1.collect().foreach(println)

    //案例：按规则排序
    val rdd6 = sc.makeRDD(List(("1", 1), ("11", 2), ("2", 3)), 2)
                                                  // 第二个参数为false 为降序 默认升序
    val sortRdd2 = rdd6.sortBy(t=>t._1.toInt,false)
    sortRdd2.collect().foreach(println)

    /*
    交集 并集 差集 拉链
     */
    val rdd7 =  sc.makeRDD( List(1,2,3,4) )
    val rdd8 =  sc.makeRDD( List(3,4,5,6) )
    //交集
    val inRdd =  rdd7.intersection(rdd8)
    println(inRdd.collect().mkString(","))
    //并集
    val unRdd =  rdd7.union(rdd8)
    println(unRdd.collect().mkString(","))
    //差集
    val suRdd =  rdd7.subtract(rdd8)
    println(suRdd.collect().mkString(","))
    //拉链
    val zipRdd =  rdd7.zip(rdd8)
    println(zipRdd.collect().mkString(","))

    /*
    partitionBy:将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner
     */

    val rdd9 = sc.makeRDD( List(1,2,3,4) ,2)
    val tRdd =  rdd9.map((_,1))//=> (1,1) (2,1) (3,1) (4,1)
    val pRdd =  tRdd.partitionBy(new HashPartitioner(3))
    //pRdd.saveAsTextFile("output3")



    sc.stop();

  }

}
